package schedule.zk;

import com.google.common.base.Throwables;
import com.sun.org.apache.bcel.internal.generic.NEW;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.KeeperException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import schedule.zk.exception.ZkException;

import java.io.UnsupportedEncodingException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @Author liwenguang
 * @Date 2018/4/28 下午5:06
 * @Description
 */
@Component("ZkClient")
@Slf4j
public class ZkClient {

    @Value("${zk_url}")
    private String hosts;

    @Value("${zk_schedule_node_url}")
    private String namespace;

    @Value("${zk_maxWaitTime}")
    private int maxWaitTime;

    private volatile boolean started;

    // 重试机制
    private static final ExponentialBackoffRetry retryStrategy = new ExponentialBackoffRetry(1000, 3);
    private CuratorFramework curatorFramework;

    public void start() {
        if (started) {
            return;
        }
        doStart();
    }

    // 执行连接操作
    private void doStart() {
        curatorFramework = CuratorFrameworkFactory.builder()
                .connectString(hosts)
                .namespace(namespace)
                .retryPolicy(retryStrategy)
                .build();

        curatorFramework.start();

        try {
            // 阻塞式连接，最大阻塞时间30s
            curatorFramework.blockUntilConnected(maxWaitTime, TimeUnit.SECONDS);
            started = true;
        } catch (InterruptedException e) {
            throw new ZkException(e);
        }
    }

    // 重连，先判断是否连接，再判断是否不为null
    public void restart(){
        Lock lock = new ReentrantLock();
        try {
            // 获取锁，获取时间上限30s
            boolean locked = lock.tryLock(30, TimeUnit.SECONDS);
            if (!locked){
                log.warn("timeout to get the restart lock, maybe it's locked by another.");
                return;
            }

            if (curatorFramework.getZookeeperClient().isConnected()){
                return;
            }

            if (curatorFramework != null){
                // close old connection
                curatorFramework.close();
            }

            doStart();

        } catch (InterruptedException e) {
            log.error("failed to get the restart lock, cause: {}", Throwables.getStackTraceAsString(e));
        } finally {
            lock.unlock();
        }

    }

    public String getString(String path){
        byte[] data = get(path);
        if (data != null){
            try {
                return new String(data, "UTF-8");
            } catch (UnsupportedEncodingException e) {
                throw new RuntimeException(e);
            }
        }
        return null;
    }

    public byte[] get(String path){
        try {
            return curatorFramework.getData().forPath(path);
        } catch (Exception e){
            handleConnectionLoss(e);
            throw new ZkException(e);
        }
    }

    // 创建节点删除节点等操作，出现异常都要重连操作
    private void handleConnectionLoss(Exception e){
        if (e instanceof KeeperException.ConnectionLossException){

            log.warn("zk client will restart...");

            // try to restart the zk connection
            restart();

            log.warn("zk client do restart finished.");
        }
    }
}